package com.atguigu.gmall.realtime.app.demo;

import com.atguigu.gmall.realtime.util.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class DemoKafkaApp {

    public static void main(String[] args) throws Exception {


        //1  环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String topic="topic_log";
        String groupId="demo_kafka_app";

        // 2  source
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer(topic, groupId);
        DataStreamSource<String> kafkaStream = env.addSource(kafkaConsumer);


        //3  打印
        kafkaStream.print();
        //4   env的执行
        env.execute();



    }
}
